Skip to content

MarQS reserve concurrency system & queue priority for resuming/retrying #1715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 19, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Feb 19, 2025

This feature improves the way our queuing system handles concurrency and priority, especially around doing triggerAndWait and batchTriggerAndWait.

  • A new reserve concurrency system, that allows "freeing" up concurrency slots for runs that are executing but waiting for another task to finish (e.g. triggerAndWait).
  • Special handling of triggerAndWait when the parent and the child share a queue, and can result in a new Recursive Queue Deadlock error
  • Propagates a "queue timestamp" from the root run to prioritize running runs in a task hierarchy, meaning that runs with ancestor runs will complete faster.
  • When runs resume, or retry, those messages are put in priority queues which take precedence over runs that have not started executing. This will also help finish task run hierarchies faster.
  • Org level concurrency is no longer tracked or used
  • Delayed runs and non-delayed runs are enqueued in the same way now

Summary by CodeRabbit

  • New Features

    • Introduced an admin debug dialog to provide detailed task run insights.
    • Enhanced job status displays with a new “Aborted” state.
    • Expanded API support for retrieving environment and queue concurrency metrics.
    • Integrated priority-based message handling for refined task enqueueing.
  • Bug Fixes

    • Improved error and output handling for task runs.
    • Refined queue management for more consistent task execution.
  • Documentation

    • Updated guides on concurrency management and task execution.
  • Tests

    • Expanded test coverage for new concurrency, prioritization, and administrative features.

Copy link

changeset-bot bot commented Feb 19, 2025

⚠️ No Changeset found

Latest commit: 84f5563

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Feb 19, 2025

Walkthrough

This pull request introduces several new admin debugging components and updates task and batch status handling. The changes refactor environment configurations (shifting from organization limits to environment limits) and enhance the MarQS concurrency and queue systems with updated key producers and fair dequeue strategies. Numerous service methods have been modified to improve error handling and message priority using the new MarQSPriorityLevel. Additionally, SQL migrations update the schema and enum definitions, and comprehensive tests and documentation updates are provided.

Changes

File(s) Change Summary
apps/webapp/app/components/admin/debugRun.tsx
apps/webapp/app/routes/.../spans.$spanParam/route.tsx
Introduced new admin debug components (AdminDebugRun, DebugRunDialog, DebugRunContent, DebugRunData) for conditionally rendering debug run dialogs based on permissions.
apps/webapp/app/components/runs/v3/BatchStatus.tsx
apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
apps/webapp/app/presenters/v3/SpanPresenter.server.ts
apps/webapp/app/routes/_app.orgs.../runs.$runParam/route.tsx
Updated batch status logic to include the “ABORTED” state; refactored run output and error handling; made minor UI text and import adjustments.
apps/webapp/app/env.server.ts
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts
Replaced MARQS_MAXIMUM_ORG_COUNT with MARQS_MAXIMUM_ENV_COUNT and added a new loader endpoint with SearchParamsSchema for retrieving environment concurrency metrics.
apps/webapp/app/v3/marqs/*
apps/webapp/app/v3/marqs/types.ts
Refactored the MarQS queue system to be environment-centric with updated method signatures, new classes (EnvPriorityDequeuingStrategy), and a revised key producer implementation.
apps/webapp/app/v3/services/* Modified several service methods for batch and task processing: error handling now marks batches as ABORTED and message handling now includes MarQSPriorityLevel in enqueue/replace functions.
internal-packages/database/prisma/*
packages/core/src/v3/schemas/common.ts
SQL migrations added the queueTimestamp column and updated the BatchTaskRunStatus enum to include ABORTED; indices on SecretStore were dropped and recreated; new error codes introduced.
packages/core/src/v3/errors.ts
packages/core/src/v3/links.ts
packages/core/src/v3/utils/flattenAttributes.ts
Added the taskRunErrorToString function, a new concurrency documentation link, and applied minor formatting improvements.
apps/webapp/test/* Added and updated test suites to verify environment priority dequeue, fair dequeue strategies, key producer functionalities, and concurrency setup utilities.
references/test-tasks/*
references/v3-catalog/*
Introduced new project configuration files, comprehensive tests for reserve concurrency, updated trigger tasks and concurrency management flows, and removed legacy tasks.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant AdminDebugRun
    participant PermissionChecker
    participant DebugRunDialog
    participant DebugRunContent
    participant DebugRunData

    User->>AdminDebugRun: Request debug run
    AdminDebugRun->>PermissionChecker: Check admin/impersonation
    PermissionChecker-->>AdminDebugRun: Return authorization
    alt Authorized
        AdminDebugRun->>DebugRunDialog: Render dialog
        DebugRunDialog->>DebugRunContent: Load debug data
        DebugRunContent->>DebugRunData: Format and forward run details
        DebugRunData-->>DebugRunContent: Display debug info
    else Not Authorized
        AdminDebugRun-->>User: Render nothing (access denied)
    end
Loading
sequenceDiagram
    participant TriggerTaskService
    participant EnqueueRunFunction
    participant MarQS
    participant MessageQueue

    TriggerTaskService->>EnqueueRunFunction: Call enqueueRun with task info
    EnqueueRunFunction->>MarQS: Invoke enqueueMessage (with reserve & priority)
    MarQS->>MessageQueue: Process message enqueue
    MessageQueue-->>MarQS: Acknowledge message
    MarQS-->>EnqueueRunFunction: Return status
    EnqueueRunFunction-->>TriggerTaskService: Return enqueue result
Loading

Possibly related PRs

  • Prioritize finishing waited runs #1375: The changes in the main PR, which introduce new components for managing debug runs and their data presentation, are related to the retrieved PR, as both involve enhancements to the handling of task runs, specifically in how they manage state and timestamps during processing. The main PR's focus on user permissions and data display complements the retrieved PR's improvements in prioritizing and managing the execution of resumed runs.

Suggested reviewers

  • matt-aitken

Poem

I hopped through lines of code with cheer,
Adding admin views so clear,
Queues and tasks now dance in line,
With errors tamed and schema fine.
A rabbit’s joy in every commit,
Leaping forward bit by bit!

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (64)
references/v3-catalog/src/trigger/concurrency.ts (7)

20-51: Check ID references and consider parameter-based configuration.
You changed the task ID to "test-concurrency-controller" and introduced a childDelay parameter. Ensure that any external or downstream references expecting the old ID or missing the new parameter are updated. Also, the hard-coded 2-second wait (line 44) may be better served by a configurable parameter or environment variable if timing is critical.


58-71: Optional error handling for multi-step calls.
The parent task triggers testConcurrencyChild.triggerAndWait, but there is no explicit error handling. If a child task fails, it might be beneficial to handle or log the error to avoid silent failures.


153-164: Consider consistent return format.
You’re returning { completedAt: new Date() }. Other parts of the code sometimes use ISO strings. A unified format (e.g., .toISOString()) can help with downstream parsing.


166-183: Returning a standard timestamp format might help.
Like the controller task, this parent task returns { completedAt: new Date() }. If you want consistent logs across tasks, consider .toISOString().


201-223: Make propagation behavior explicit.
The logic conditionally triggers the grandchild if propagate is undefined or truthy. Consider defaulting propagate to true to make the intended behavior clearer.

-export const testChildTaskPriorityChild = task({
...
-run: async ({ delay = 5000, propagate }: { delay: number; propagate?: boolean }) => {
+run: async ({ delay = 5000, propagate = true }: { delay: number; propagate?: boolean }) => {
...
}

242-259: Optionally unify timestamp format.
Here again, the returned completedAt is a raw date object. If you need consistent time formats across your tasks, consider new Date().toISOString().


265-289: Hierarchical tasks are well-structured.
Your parentTask, subtask, and subsubtask effectively demonstrate a chained sequence within myQueue. Consider optional error handling or try/catch blocks if any step can fail unexpectedly.

apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (4)

11-13: Use composition over direct assignment for the _delegate property.
To facilitate clearer dependency management, consider injecting _delegate through a parameter in the constructor and then referencing it directly, rather than reassigning in the constructor body. This will simplify testing and ensure consistency in initialization.


15-25: Ensure consistency in naming.
The method name distributeFairQueuesFromParentQueue is descriptive, but double-check it aligns with the broader naming scheme in your codebase (e.g., “distributeQueuesFromParentQueue” or “getFairQueuesFromParentQueue”). Consistent naming helps maintain clarity across similar classes and strategies.


58-63: Potential optimization by combining grouping and priority comparison.
Currently, each queue is grouped, then sorted, and subsequently all results are sorted again by priority. Consider a single pass approach to reduce complexity and overhead.


90-94: Graceful default for missing priority.
Returning zero for a missing priority is reasonable. If you anticipate rare scenarios where zero might be a valid priority and you must differentiate between “no priority” vs. “lowest priority,” consider using negative values or a separate property to distinguish.

references/test-tasks/src/utils.ts (4)

6-32: Use exponential backoff when polling run status.
The loop might issue a request every second, which could burden the system under heavy usage. An exponential or incremental backoff strategy can reduce load while waiting for long-running tasks.


34-45: Clarify error messages when environment variables are missing.
The existing error messages (“TRIGGER_API_URL is not set”, “TRIGGER_ACCESS_TOKEN is not set”) are simple but might be insufficient if multiple environment variables are expected. Consider providing guidance on how to configure them.


46-70: Improve error handling for concurrency limit updates.
When the response is not OK, the error message references response.statusText. Ensure that deeper error details (e.g., body content) are logged or captured if available. This could help in diagnosing issues on the server side.


84-109: Add caching or memoization for environment stats if calls are frequent.
Fetching environment stats in a tight loop or at high frequency can incur overhead. If the data changes infrequently, consider caching responses for short durations.

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (3)

68-70: Validate additional search parameters if needed.
The SearchParamsSchema currently allows only the optional queue parameter. If you anticipate future query params, consider preparing a more flexible schema or a separate object for them.


72-93: Leverage consistent authentication checks.
The loader and the action functions share similar authentication logic. If possible, abstract the authentication into a reusable helper to avoid code duplication and ensure uniform security handling across endpoints.


110-145: Augment concurrency data with historical context or external references.
The concurrency snapshot is valuable, but if you anticipate debugging more complex concurrency issues (e.g., capacity planning), consider returning historical usage or referencing external metrics. This can help identify trends over time.

apps/webapp/app/components/admin/debugRun.tsx (3)

34-43: Enhance dialog accessibility.
Consider adding appropriate aria-* attributes for the DialogContent (e.g. aria-labelledby) to ensure screen readers correctly reference the dialog header or label. This can improve overall accessibility.


45-67: Handle potential fetcher errors more robustly.
While there's a fallback message ("Failed to get run debug data"), it might be helpful to log or display specific error information provided by fetcher in case of a network or server error.


69-79: Avoid repeated string prefix logic.
const keys = new MarQSShortKeyProducer("marqs:") and const withPrefix = (key: string) => \marqs:${key}`are slightly redundant. You might centralize the logic withinMarQSShortKeyProducer` to avoid string concatenation in multiple places.

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts (2)

222-245: Potential performance optimization in #orderQueuesByEnvs.
The method constructs and filters arrays multiple times. If performance becomes a concern with large snapshots, consider a single pass approach or an in-place transformation.


420-461: Validate weighted selection logic in #selectTopEnvs.
The approach to average queue ages and use weighted random selection is interesting, but ensure it aligns with your fairness goals. Large differences in average ages might starve certain environments if random seeds align unfavorably for them.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (4)

9-54: Structure long test steps with sub-tests.
describeReserveConcurrencySystem runs multiple complex steps in sequence. To improve readability and debugging, consider splitting it into sub-tests or smaller tasks (e.g., testRetryPriority, testResumePriority) invoked in a top-level orchestrator.


125-173: Ensure freeze/resume logic is fully tested.
testResumePriority asserts that resumed runs complete before newly queued runs. Consider also validating partial progress or re-entrant states if a run is paused multiple times, ensuring robust coverage of the resume mechanism.


367-501: Watch out for potential queue deadlocks with deep recursion.
recursiveTask calls itself via triggerAndWait or batchTriggerAndWait. If depth is large and concurrency is limited, nested calls could exhaust capacity and stall. Consider imposing a maximum recursion depth or concurrency-based backoff.


531-538: Enhance batch error reporting.
unwrapBatchResult only reports the first failing run’s error. If multiple runs fail, it may mask subsequent errors. Aggregating or logging all errors could improve debugging.

apps/webapp/test/fairDequeuingStrategy.test.ts (8)

79-118: Test coverage for reserve concurrency is clear.
These lines add a welcome scenario for environments with reserve concurrency. The logic looks consistent with the new concurrency approach. Consider adding edge cases (e.g., reserveConcurrency at 0 or negative) to validate error conditions.


221-225: Flattening result for speed test.
Using flattenResults helps keep this test logic clear. The performance check is a strong addition. Consider verifying a tolerance rather than a strict “9 times faster” ratio, to avoid random CI noise.


249-250: Performance ratio check.
The ratio-based performance assertion might occasionally be flaky if environment load varies. If that becomes an issue, consider a more robust approach (e.g., mocking or smaller ratio).


794-794: setupConcurrency usage.
You’re setting { currentConcurrency: 0, limit: 5 } for each environment, which is a good baseline for concurrency. Possibly consider an environment with already-exceeded concurrency to test rejection.


831-831: Accumulate environment selections.
Incrementing selection counts is a simple approach for measuring distribution. Ensure you reset these counts if you expand the test scenarios.


835-835: Console debug for analysis.
Logging selected environment counts helps short-term debugging but can clutter test output. Consider conditional logs or a debug flag if logs become noisy.


844-845: Sorting environment frequencies.
Slicing or sorting the map is a handy approach. Make sure large environment sets handle sorting well.


848-850: Top-two environments verification.
Aligning with highest average age is correct. Keep an eye on test flakiness if your distribution algorithm seeds differently in the future.

apps/webapp/app/v3/services/triggerTask.server.ts (1)

567-596: Inline run enqueue with error handling.
Enqueueing runs only if they are “PENDING” is logical. The immediate rollback to SYSTEM_FAILURE if enqueueResult.ok is false prevents half-baked states. Nicely handled.

apps/webapp/app/v3/services/batchTriggerV3.server.ts (1)

459-469: Directly aborting batch on error.
Switching to "ABORTED" status with completedAt avoids repeated re-queues. This is cleaner than retrying indefinitely. If partial retries are needed, consider capturing partial progress before abort.

apps/webapp/app/v3/marqs/index.server.ts (1)

489-489: Remove unnecessary continue.

This continue statement can be safely removed without changing the loop's behavior.

-              continue;
🧰 Tools
🪛 Biome (1.9.4)

[error] 489-489: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

apps/webapp/app/v3/services/enqueueRun.server.ts (2)

31-31: Address the TODO comment about deadlock handling.

The TODO comment indicates that deadlock handling needs to be implemented for reserveConcurrency.

Would you like me to help implement the deadlock handling logic or create an issue to track this task?


32-48: Consider adding retry logic for enqueue operation.

The marqs.enqueueMessage call could benefit from retry logic to handle transient failures.

Here's a suggested implementation using exponential backoff:

-  const wasEnqueued = await marqs.enqueueMessage(
+  const wasEnqueued = await retry(
+    async () => await marqs.enqueueMessage(
       env,
       run.queue,
       run.id,
       {
         type: "EXECUTE",
         taskIdentifier: run.taskIdentifier,
         projectId: env.projectId,
         environmentId: env.id,
         environmentType: env.type,
       },
       run.concurrencyKey ?? undefined,
       run.queueTimestamp ?? undefined,
       dependentRun
         ? { messageId: dependentRun.id, recursiveQueue: dependentRun.queue === run.queue }
         : undefined
+    ),
+    {
+      retries: 3,
+      minTimeout: 1000,
+      factor: 2,
+    }
   );
apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (2)

16-38: Consider adding error handling for database query timeouts.

The database query could benefit from timeout handling to prevent long-running queries.

+  const QUERY_TIMEOUT_MS = 5000;
+  const timeoutPromise = new Promise((_, reject) => 
+    setTimeout(() => reject(new Error('Query timeout')), QUERY_TIMEOUT_MS)
+  );
+
+  const run = await Promise.race([
   $replica.taskRun.findFirst({
     where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } },
     select: {
       // ... existing select fields
     },
   }),
+    timeoutPromise
+  ]).catch(error => {
+    if (error.message === 'Query timeout') {
+      throw new Response("Query timeout", { status: 504 });
+    }
+    throw error;
+  });

44-62: Consider batching concurrency metric requests.

The multiple sequential calls to marqs for fetching concurrency metrics could be optimized by batching them together.

+  const [
+    queueConcurrencyLimit,
+    envConcurrencyLimit,
+    queueCurrentConcurrency,
+    envCurrentConcurrency,
+    queueReserveConcurrency,
+    envReserveConcurrency
+  ] = await Promise.all([
+    marqs.getQueueConcurrencyLimit(run.runtimeEnvironment, run.queue),
+    marqs.getEnvConcurrencyLimit(run.runtimeEnvironment),
+    marqs.currentConcurrencyOfQueue(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined),
+    marqs.currentConcurrencyOfEnvironment(run.runtimeEnvironment),
+    marqs.reserveConcurrencyOfQueue(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined),
+    marqs.reserveConcurrencyOfEnvironment(run.runtimeEnvironment)
+  ]);
apps/webapp/app/components/runs/v3/BatchStatus.tsx (1)

62-88: Consider adding a11y attributes for status colors.

The status colors should include ARIA attributes for better accessibility.

 export function BatchStatusIcon({
   status,
   className,
+  ariaLabel,
 }: {
   status: BatchTaskRunStatus;
   className: string;
+  ariaLabel?: string;
 }) {
   switch (status) {
     case "PENDING":
-      return <Spinner className={cn(batchStatusColor(status), className)} />;
+      return <Spinner className={cn(batchStatusColor(status), className)} aria-label={ariaLabel ?? batchStatusTitle(status)} />;
     case "COMPLETED":
-      return <CheckCircleIcon className={cn(batchStatusColor(status), className)} />;
+      return <CheckCircleIcon className={cn(batchStatusColor(status), className)} aria-label={ariaLabel ?? batchStatusTitle(status)} />;
     case "ABORTED":
-      return <XCircleIcon className={cn(batchStatusColor(status), className)} />;
+      return <XCircleIcon className={cn(batchStatusColor(status), className)} aria-label={ariaLabel ?? batchStatusTitle(status)} />;
apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (1)

37-67: Consider optimizing the nested includes in the query.

The deeply nested includes in the query could impact performance. Consider using a more targeted query structure.

-  include: {
-    dependency: {
-      include: {
-        dependentBatchRun: {
-          include: {
-            dependentTaskAttempt: {
-              include: {
-                taskRun: true,
-              },
-            },
-          },
-        },
-        dependentAttempt: {
-          include: {
-            taskRun: true,
-          },
-        },
-      },
-    },
-  },
+  select: {
+    dependency: {
+      select: {
+        dependentBatchRun: {
+          select: {
+            dependentTaskAttempt: {
+              select: {
+                taskRun: {
+                  select: {
+                    id: true,
+                    queue: true,
+                  },
+                },
+              },
+            },
+          },
+        },
+        dependentAttempt: {
+          select: {
+            taskRun: {
+              select: {
+                id: true,
+                queue: true,
+              },
+            },
+          },
+        },
+      },
+    },
+  },
apps/webapp/app/v3/marqs/types.ts (1)

110-113: Add documentation for the recursiveQueue flag.

Please add JSDoc comments explaining the purpose and behavior of the recursiveQueue flag in EnqueueMessageReserveConcurrencyOptions.

 export type EnqueueMessageReserveConcurrencyOptions = {
+  /** The unique identifier of the message to be enqueued */
   messageId: string;
+  /** Whether to recursively process the queue. Add more details about when this should be true/false */
   recursiveQueue: boolean;
 };
apps/webapp/test/utils/marqs.ts (1)

57-88: Add detailed documentation about the dummy jobs approach.

While the implementation is clean, it would be helpful to add more detailed documentation explaining why dummy jobs are used to simulate concurrency and reserved concurrency.

 /**
  * Sets up concurrency-related Redis keys for orgs and envs
+ * 
+ * @remarks
+ * This function uses dummy job IDs to simulate both current and reserved concurrency.
+ * Dummy jobs are used because... [explain the rationale]
+ * 
+ * @param options - The options for setting up concurrency
+ * @param options.redis - The Redis instance
+ * @param options.keyProducer - The key producer instance
+ * @param options.env - The environment configuration
+ * @param options.env.id - The environment ID
+ * @param options.env.currentConcurrency - The number of concurrent jobs to simulate
+ * @param options.env.limit - Optional concurrency limit
+ * @param options.env.reserveConcurrency - Optional number of reserved concurrent jobs
  */
apps/webapp/app/v3/services/resumeTaskDependency.server.ts (2)

53-72: Improve the TODO comment.

The current TODO comment is informal and lacks clarity. Please update it to better describe what needs to be done.

-      // TODO: use the new priority queue thingie
+      // TODO: Implement priority queue system for task dependency resumption
+      // This should handle message priorities using MarQSPriorityLevel

92-106: Document the timestamp fallback logic.

Please add a comment explaining why we fall back to createdAt when queueTimestamp is not available.

+      // Fall back to createdAt if queueTimestamp is not available
+      // This ensures consistent message ordering while maintaining backward compatibility
       await marqs?.replaceMessage(
apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)

23-312: Add JSDoc comments for better documentation.

Consider adding JSDoc comments to describe the purpose, parameters, and return values of public methods. This would improve code maintainability and developer experience.

Example for the queueKey method:

+/**
+ * Generates a queue key based on the provided parameters.
+ * @param envOrOrgId - The environment object or organization ID
+ * @param queueOrEnvId - The queue name or environment ID
+ * @param queueOrConcurrencyKey - The queue name or concurrency key
+ * @param concurrencyKeyOrPriority - Optional concurrency key or priority
+ * @param priority - Optional priority value
+ * @returns The generated queue key
+ */
 queueKey(
   envOrOrgId: MarQSKeyProducerEnv | string,
   queueOrEnvId: string,
   queueOrConcurrencyKey: string,
   concurrencyKeyOrPriority?: string | number,
   priority?: number
 ): string {

283-285: Add input validation for priority values.

The prioritySection method should validate that the priority value is a positive integer to prevent potential issues.

 private prioritySection(priority: number) {
+  if (!Number.isInteger(priority) || priority < 0) {
+    throw new Error(`Invalid priority value: ${priority}. Must be a positive integer.`);
+  }
   return `${constants.PRIORITY_PART}:${priority}`;
 }
apps/webapp/app/v3/services/createCheckpoint.server.ts (1)

306-306: Consider removing redundant undefined argument.

The explicit undefined argument is redundant as it's the default value when no argument is provided.

-            undefined,
apps/webapp/test/marqsKeyProducer.test.ts (3)

10-14: Consider extracting test data setup to a shared helper.

The test data setup for TestDelegate could be moved to a shared helper function to improve reusability and maintainability.

+class TestDelegate implements MarQSFairDequeueStrategy {
+  static createWithQueues(queues: EnvQueues[]): TestDelegate {
+    return new TestDelegate(queues);
+  }
+
   constructor(private queues: EnvQueues[]) {}

   async distributeFairQueuesFromParentQueue(): Promise<Array<EnvQueues>> {
     return this.queues;
   }
 }

158-162: Add test cases for invalid inputs.

Consider adding test cases for invalid queue strings to ensure robust error handling.

 it("should throw error for invalid queue string", () => {
   const invalidQueue = "invalid:queue:string";
   expect(() => producer.queueDescriptorFromQueue(invalidQueue)).toThrow("Invalid queue");
 });
+
+it("should throw error for empty queue string", () => {
+  expect(() => producer.queueDescriptorFromQueue("")).toThrow("Invalid queue");
+});
+
+it("should throw error for malformed queue string", () => {
+  expect(() => producer.queueDescriptorFromQueue("org:123:env:456:invalid:testQueue"))
+    .toThrow("Invalid queue");
+});

182-190: Improve test description for stable sort.

The test description could be more explicit about what is being tested.

-// Check that queue2 and queue4 (priority 1) maintain their relative order
-// and queue1, queue3, and queue5 (priority 0) maintain their relative order
+// Verify that:
+// 1. Higher priority queues (queue2, queue4 with priority 1) are sorted first
+// 2. Within the same priority level, the original order is preserved
+// 3. Non-priority queues (queue1, queue3, queue5) maintain their relative order at the end
apps/webapp/test/envPriorityDequeueingStrategy.test.ts (2)

19-40: Extract test data setup to a shared helper.

Consider extracting common test data setup to reduce duplication and improve maintainability.

+const createTestEnvQueues = (envId: string, queues: string[]): EnvQueues => ({
+  envId,
+  queues,
+});
+
+const createTestQueue = (
+  orgId: string,
+  envId: string,
+  queueName: string,
+  priority?: number,
+  concurrencyKey?: string
+): string => {
+  let queue = `org:${orgId}:env:${envId}:queue:${queueName}`;
+  if (concurrencyKey) queue += `:ck:${concurrencyKey}`;
+  if (priority !== undefined) queue += `:priority:${priority}`;
+  return queue;
+};

 const inputQueues: EnvQueues[] = [
   {
     envId: "env1",
     queues: [
-      "org:org1:env:env1:queue:queue1:priority:1",
-      "org:org1:env:env1:queue:queue2:priority:1",
-      "org:org1:env:env1:queue:queue3:priority:1",
+      createTestQueue("org1", "env1", "queue1", 1),
+      createTestQueue("org1", "env1", "queue2", 1),
+      createTestQueue("org1", "env1", "queue3", 1),
     ],
   },
 ];

193-225: Add test cases for additional edge cases.

Consider adding test cases for more edge cases to ensure robust behavior.

+it("should handle queues with extremely large negative priorities", async () => {
+  const inputQueues: EnvQueues[] = [
+    {
+      envId: "env1",
+      queues: [
+        "org:org1:env:env1:queue:queue1:priority:-9999999",
+        "org:org1:env:env1:queue:queue2:priority:1",
+      ],
+    },
+  ];
+
+  const delegate = new TestDelegate(inputQueues);
+  const strategy = new EnvPriorityDequeuingStrategy({
+    delegate,
+    keys: keyProducer,
+  });
+
+  const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1");
+
+  expect(result[0].queues).toEqual([
+    "org:org1:env:env1:queue:queue2:priority:1",
+    "org:org1:env:env1:queue:queue1:priority:-9999999",
+  ]);
+});
+
+it("should handle queues with malformed priority values", async () => {
+  const inputQueues: EnvQueues[] = [
+    {
+      envId: "env1",
+      queues: [
+        "org:org1:env:env1:queue:queue1:priority:invalid",
+        "org:org1:env:env1:queue:queue2:priority:1",
+      ],
+    },
+  ];
+
+  const delegate = new TestDelegate(inputQueues);
+  const strategy = new EnvPriorityDequeuingStrategy({
+    delegate,
+    keys: keyProducer,
+  });
+
+  const result = await strategy.distributeFairQueuesFromParentQueue("parentQueue", "consumer1");
+
+  // Expect malformed priority to be treated as priority 0
+  expect(result[0].queues).toEqual([
+    "org:org1:env:env1:queue:queue2:priority:1",
+    "org:org1:env:env1:queue:queue1:priority:invalid",
+  ]);
+});
apps/webapp/app/env.server.ts (1)

360-360: Document the new environment limit configuration.

The new MARQS_MAXIMUM_ENV_COUNT configuration needs documentation to explain its purpose and impact.

+  /** Maximum number of environments allowed in the system.
+   * This replaces the previous organization-based limit to align with the new environment-centric architecture.
+   * @example 100
+   */
   MARQS_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
apps/webapp/app/v3/services/completeAttempt.server.ts (1)

467-482: Extract retry queue logic to a separate method.

The retry queue logic is complex and could be extracted to improve readability and maintainability.

+  private async retryViaQueue(
+    runId: string,
+    taskIdentifier: string,
+    checkpointEventId: string | undefined,
+    retryTimestamp: number
+  ) {
+    logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId });
+
+    return marqs?.replaceMessage(
+      runId,
+      {
+        type: "EXECUTE",
+        taskIdentifier,
+        checkpointEventId: this.opts.supportsRetryCheckpoints ? checkpointEventId : undefined,
+        retryCheckpointsDisabled: !this.opts.supportsRetryCheckpoints,
+      },
+      retryTimestamp,
+      MarQSPriorityLevel.retry
+    );
+  }

   const retryViaQueue = () => {
-    logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
-
-    // We have to replace a potential RESUME with EXECUTE to correctly retry the attempt
-    return marqs?.replaceMessage(
-      run.id,
-      {
-        type: "EXECUTE",
-        taskIdentifier: run.taskIdentifier,
-        checkpointEventId: this.opts.supportsRetryCheckpoints ? checkpointEventId : undefined,
-        retryCheckpointsDisabled: !this.opts.supportsRetryCheckpoints,
-      },
-      executionRetry.timestamp,
-      MarQSPriorityLevel.retry
-    );
+    return this.retryViaQueue(
+      run.id,
+      run.taskIdentifier,
+      checkpointEventId,
+      executionRetry.timestamp
+    );
   };
internal-packages/database/prisma/migrations/20250210164232_add_queue_timestamp_to_run/migration.sql (1)

4-6: Adding the queueTimestamp Column to TaskRun

A new column "queueTimestamp" (of type TIMESTAMP(3)) is added to the "TaskRun" table. This field will be useful for tracking when tasks are queued. Please verify whether this column should be nullable (as no default is provided) and confirm that downstream services and the Prisma schema are updated to handle a potential null value.

docs/queue-concurrency.mdx (4)

10-14: ‘Default Concurrency’ Section Added

The new “## Default concurrency” header and its accompanying text effectively describe that by default each task may run with unbounded concurrency (subject to the environment-level limits). The phrase “fill up the entire concurrency limit” is vivid; just be sure that the intended meaning is evident to new users.


257-257: Stylistic Suggestion on Intensifiers

The phrase “It’s very important to note…” appears at around line 257. Consider replacing “very important” with “important” to reduce overuse of intensifiers and improve style.

🧰 Tools
🪛 LanguageTool

[style] ~257-~257: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...e parent task slot is reoccupied. It's very important to note that we only release at-most X ...

(EN_WEAK_ADJECTIVE)


70-70: Typographical Correction Suggestion

A static analysis hint suggests inserting a comma for better readability on a sentence around line 70. Please review the sentence starting “When you trigger a task…” and consider adding a comma (e.g. after a leading clause) to improve clarity.

🧰 Tools
🪛 LanguageTool

[typographical] ~70-~70: Consider inserting a comma for improved readability.
Context: ...u trigger a run When you trigger a task you can override the concurrency limit. Thi...

(INITIAL_ADVP_COMMA)


219-219: Comma After “for example”

A LanguageTool hint flags that after “for example” a comma is usually expected. Adding this comma can improve the sentence flow in that portion of the text.

🧰 Tools
🪛 LanguageTool

[typographical] ~219-~219: After the expression ‘for example’ a comma is usually used.
Context: ...yload) => { //... }, }); ``` For example purposes, let's say the environment con...

(COMMA_FOR_EXAMPLE)

internal-packages/database/prisma/schema.prisma (1)

1734-1735: New queueTimestamp Field in TaskRun Model

A new optional field queueTimestamp DateTime? is added to the TaskRun model. This field will allow the system to record when a run is queued, which supports the new concurrency/reservation functionality. Ensure that application logic (and any queries/sorts) referencing task timing is updated accordingly.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d6c869e and d4b1083.

⛔ Files ignored due to path filters (2)
  • docs/images/recursive-task-deadlock-min.png is excluded by !**/*.png
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (47)
  • apps/webapp/app/components/admin/debugRun.tsx (1 hunks)
  • apps/webapp/app/components/runs/v3/BatchStatus.tsx (4 hunks)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/BatchListPresenter.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (4 hunks)
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam/route.tsx (3 hunks)
  • apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (2 hunks)
  • apps/webapp/app/routes/projects.v3.$projectRef.metrics/registerProjectMetrics.server.ts (0 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx (2 hunks)
  • apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (1 hunks)
  • apps/webapp/app/v3/eventRepository.server.ts (5 hunks)
  • apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts (12 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (25 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (0 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts (1 hunks)
  • apps/webapp/app/v3/marqs/types.ts (2 hunks)
  • apps/webapp/app/v3/marqs/v2.server.ts (1 hunks)
  • apps/webapp/app/v3/services/batchTriggerV3.server.ts (1 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (3 hunks)
  • apps/webapp/app/v3/services/createCheckpoint.server.ts (3 hunks)
  • apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (3 hunks)
  • apps/webapp/app/v3/services/enqueueRun.server.ts (1 hunks)
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts (5 hunks)
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (4 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (7 hunks)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts (1 hunks)
  • apps/webapp/test/fairDequeuingStrategy.test.ts (13 hunks)
  • apps/webapp/test/marqsKeyProducer.test.ts (1 hunks)
  • apps/webapp/test/utils/marqs.ts (3 hunks)
  • docs/queue-concurrency.mdx (6 hunks)
  • internal-packages/database/prisma/migrations/20250210164232_add_queue_timestamp_to_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250211150836_add_aborted_batch_task_run_status/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (2 hunks)
  • packages/core/src/v3/errors.ts (4 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/common.ts (1 hunks)
  • packages/core/src/v3/utils/flattenAttributes.ts (2 hunks)
  • references/test-tasks/package.json (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (1 hunks)
  • references/test-tasks/src/utils.ts (1 hunks)
  • references/test-tasks/trigger.config.ts (1 hunks)
  • references/test-tasks/tsconfig.json (1 hunks)
  • references/v3-catalog/src/trigger/concurrency.ts (3 hunks)
  • references/v3-catalog/src/trigger/simple.ts (1 hunks)
  • references/v3-catalog/trigger.config.ts (1 hunks)
💤 Files with no reviewable changes (2)
  • apps/webapp/app/routes/projects.v3.$projectRef.metrics/registerProjectMetrics.server.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
✅ Files skipped from review due to trivial changes (3)
  • references/test-tasks/tsconfig.json
  • packages/core/src/v3/utils/flattenAttributes.ts
  • references/test-tasks/package.json
🧰 Additional context used
🪛 LanguageTool
docs/queue-concurrency.mdx

[typographical] ~70-~70: Consider inserting a comma for improved readability.
Context: ...u trigger a run When you trigger a task you can override the concurrency limit. Thi...

(INITIAL_ADVP_COMMA)


[typographical] ~219-~219: After the expression ‘for example’ a comma is usually used.
Context: ...yload) => { //... }, }); ``` For example purposes, let's say the environment con...

(COMMA_FOR_EXAMPLE)


[style] ~257-~257: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...e parent task slot is reoccupied. It's very important to note that we only release at-most X ...

(EN_WEAK_ADJECTIVE)

🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts

[error] 489-489: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

🔇 Additional comments (96)
references/v3-catalog/trigger.config.ts (1)

22-31: LGTM! Disabling retries in dev environment.

This change simplifies the development workflow by providing immediate failure feedback without retries. It also helps isolate and test the new MarQS queue priority system by removing retry-related complexity during development.

Please confirm this change is intentional as part of the MarQS concurrency system updates. While it improves the development experience, ensure it aligns with your testing strategy for the new queue priority implementation.

references/v3-catalog/src/trigger/concurrency.ts (6)

1-1: Import statement looks good.
No issues spotted with the updated import from the @trigger.dev/sdk/v3.


73-89: Concurrency limit for child tasks looks solid.
Allowing concurrency up to 10 here is clear for load testing scenarios. This is presumably intentional for your concurrency tests.


185-199: Batch-trigger usage looks appropriate.
Your usage of batchTrigger to create multiple child tasks is straightforward. Just confirm your concurrency or priority needs are satisfied if these child tasks run in parallel.


225-240: Straightforward grandchild creation.
Batch-triggering three tasks ensures a controlled test scenario. This is clear and well-structured.


260-263: Dedicated queue definition is clear.
Using myQueue with concurrencyLimit: 1 is a simple, effective way to serialize tasks.


91-151:

❓ Verification inconclusive

Verify recursion depth to avoid unexpected issues.
This task recursively triggers itself when currentDepth < depth. Large or negative values for depth could introduce performance overhead or edge-case bugs.


🏁 Script executed:

#!/bin/bash
# Description: Search for any external usage of testReserveConcurrencyRecursiveWaits that might pass problematic depth values.
rg -A 5 'testReserveConcurrencyRecursiveWaits\.trigger|testReserveConcurrencyRecursiveWaits\.batchTrigger'

Length of output: 1132


Action: Review the recursion depth validation logic

The current recursive invocation in testReserveConcurrencyRecursiveWaits simply increments the currentDepth without any explicit validation of the depth parameter. While our search confirms that only internal calls are made (passing the unchanged depth), it’s important to ensure that:

  • Only valid depth values (i.e., non-negative and within a reasonable limit) are allowed.
  • Future external modifications don’t inadvertently pass problematic values that could lead to deep recursion or performance issues.

Please consider adding input validation or an upper bound check on the depth parameter to safeguard against unexpected behavior.

apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (1)

33-36: Verify sorting stability for identical priorities.
The comment notes that queues with the same priority should keep their original order. JavaScript’s built-in .sort() is stable in modern engines, but older JavaScript implementations might not guarantee stable sorting. Confirm your environment fully supports stable sorting, or use a stable sorting library to ensure correct behavior.

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (1)

94-108: Confirm environment ownership checks if needed.
While you verify environment existence, confirm you don’t need to check user ownership or additional invariants for read access. Depending on platform rules, verifying admin status may be enough, but some contexts require environment-specific ownership checks.

apps/webapp/app/components/admin/debugRun.tsx (1)

14-21: Validate conditions for admin or impersonating access.
Currently, if hasAdminAccess is false but isImpersonating is true, the component is still rendered. Please confirm that impersonation alone should allow access to debug functionality, as this may expose sensitive data.

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts (3)

6-7: Check type usage for EnvQueues.
The newly imported EnvQueues type is used throughout the class. Ensure all references match the updated structure (e.g., .queues array) and that downstream code expects these new return types.


111-114: Ensure new return type is handled correctly.
distributeFairQueuesFromParentQueue now returns Promise<Array<EnvQueues>> instead of Promise<Array<string>>. Validate that all callers properly handle the more complex return structure, especially if they previously expected a list of queue IDs.


466-480: Check concurrency edge cases.
#getEnvConcurrency merges three values (current, limit, and reserve). Verify that negative or zero concurrency values are handled safely to avoid potential undefined behavior in higher-level logic.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (3)

56-123: Verify concurrency capacity before test.
testRetryPriority triggers multiple runs that rely on concurrency limits. Ensure you verify initial concurrency state (like queue capacity) to prevent false positives if concurrency is already exhausted anonymously by other tests in large suites.


175-233: Handle development vs. production environment logic carefully.
There's a conditional on line 194 checking ctx.environment.type !== "DEVELOPMENT". Ensure test coverage for both scenarios or unify the logic so that local and production runs follow consistent concurrency patterns.


319-365: Ensure concurrency for nested triggers.
testQueueReserveConcurrency triggers recursive tasks and expects them to succeed or fail in a certain order. If new concurrency constraints are introduced (like partial concurrency usage by other tests), the outcome may differ. Confirm each test is isolated.

apps/webapp/test/fairDequeuingStrategy.test.ts (16)

11-11: Import looks good.
This new import of EnvQueues nicely clarifies the return structure in your tests.


18-18: Reflect new naming in test description.
Renaming the test to reflect environment-centric approach is consistent with your overall refactor from org-based concurrency to env-based concurrency.


42-44: Check object shape for correctness.
By returning an object with both envId and queues, the test now aligns with the new EnvQueues structure. Make sure all relevant downstream tests and asserts follow the same structure.


166-166: Confirm zero-result scenario is expected.
A length check of expect(result).toHaveLength(1) is consistent with your parentQueueLimit logic. Ensure you have a negative or edge test for parentQueueLimit = 0 to confirm the code handles that gracefully.


169-172: Validation on partial queue selection.
Selecting exactly two queues and returning them in the expected shape verifies partial selection. Looks good for coverage.


745-757: maximumEnvCount validation is consistent.
Renaming from “max org count” to “maximumEnvCount” clarifies intent. Ensure you equivalently tested scenarios where no environment meets the average queue age threshold, or many do.


761-790: Clear environment-based queue profiles.
Defining each envId with different average ages is a clean approach to verifying environment prioritization. The logic for selecting top environments by age is correct.


806-808: Unique queue identifiers.
Appending -i to queueId ensures each queue is distinct. This helps reveal any concurrency or ID collisions. Nicely done.


816-816: DistributeFairQueuesFromParentQueue usage.
You pass "consumer-${i}" as the consumer ID. This randomization effectively simulates multiple consumers; good way to test distribution.


825-825: Set usage is neat.
Using a Set for selected environments is straightforward and ensures no duplicates. This logic is well-structured.


828-828: Maximum environment count check.
toBeLessThanOrEqual(2) aligns with the newly introduced maximumEnvCount. Looks correct.


838-838: Heuristic priority.
Checking that env-2 is more frequently chosen than env-4 confirms the age-based priority. Great direct test.


841-842: Lower-tier environment selection check.
Enforcing env-4 < env-2 ensures the logic for “lowest average age is rarely chosen.” This is consistent.


854-855: Summation logic.
Combining total selections is good for computing percentages. No issues noted.


864-868: Percentage thresholds.
Explicit numeric checks (> 40%, < 20%) are clear acceptance criteria. Double-check test reliability across multiple runs.


874-876: flattenResults helper is succinct.
This helper makes the final array usage straightforward. Nicely encapsulated.

apps/webapp/app/v3/services/triggerTask.server.ts (12)

3-3: New import is consistent.
packetRequiresOffloading is a useful helper for large payloads. Ensure the threshold logic is tested.


6-7: Enhanced error handling.
Introducing taskRunErrorEnhancer and taskRunErrorToString extends clarity for debugging failures. Great addition.


10-10: parseNaturalLanguageDuration import.
Parsing durations in natural language reduces confusion for end users. Good to see.


13-14: Queue utilities.
Imports for sanitizeQueueName and createTag reflect better queue naming & dynamic tagging. Keep an eye on any naming collisions.


17-19: Consolidated environment checks.
You’re importing platform checks (getEntitlement) and more advanced logging. This centralizes environment-based validations.


26-27: Supplementary imports for concurrency checks.
isFinalAttemptStatus and isFinalRunStatus unify the run termination logic. Good synergy with uploadPacketToObjectStore.


30-30: BaseService synergy.
Extending BaseService and using ServiceValidationError ensures consistent error handling across the codebase.


32-33: Queue-run synergy.
Introducing enqueueRun and ExpireEnqueuedRunService highlights a flexible approach to queue management. Great for scheduling tasks right away or with a TTL.


191-192: Loading queueTimestamp from dependent attempt.
Including queueTimestamp in the select helps maintain consistent ordering and timestamps across dependent runs.


303-303: Tracing event via eventRepository.
Wrapping the entire creation logic in traceEvent is consistent with your telemetry approach. Keep trace spans short to avoid performance overhead.


376-380: Queue timestamp fallback logic.
Chaining the dependent attempts’ queueTimestamp with the delayUntil or new Date() ensures stable fallback. This is robust.


602-607: Exception path refinement.
Throwing a ServiceValidationError from the result error ensures correct trace updating. The final return block properly distinguishes runs from errors. Good clarity.

Also applies to: 609-611, 613-616

apps/webapp/app/v3/marqs/index.server.ts (1)

1483-1485: Verify the combined concurrency logic.

Adding envReserveConcurrency to envConcurrencyLimit effectively increases the total concurrency. Verify that this is the intended behavior and won't allow tasks to exceed claimed resource limits.

Would you like a script to audit whether any environment concurrency has exceeded intended thresholds historically?

references/test-tasks/trigger.config.ts (1)

3-19: Configuration file looks good.

The retry logic and resource settings provide a solid base. Confirm that 3600 seconds as maxDuration meets SLAs and memory constraints for your tasks.

packages/core/src/v3/links.ts (1)

18-21: Thanks for adding concurrency docs reference.

Linking to this new section will help developers find guidance on managing potential deadlocks.

apps/webapp/app/v3/services/enqueueRun.server.ts (3)

6-10: LGTM! Well-structured type definition for options.

The EnqueueRunOptions type is well-defined with clear parameters and optional dependency handling.


12-20: LGTM! Clear result type definition using discriminated union.

The EnqueueRunResult type effectively uses a discriminated union to handle both success and error cases.


50-61: LGTM! Clear error handling for deadlock case.

The error handling for the deadlock case is well-implemented with a clear error message.

apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (1)

8-10: LGTM! Good use of Zod for parameter validation.

The schema definition ensures type safety for the runParam.

apps/webapp/app/components/runs/v3/BatchStatus.tsx (2)

7-15: LGTM! Clear addition of ABORTED status.

The "ABORTED" status is well-integrated with clear description.


42-60: LGTM! Consistent icon handling for new status.

The BatchStatusIcon component handles the "ABORTED" status consistently with other statuses.

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (1)

105-111: LGTM! Clean integration with new enqueueRun function.

The integration with the new enqueueRun function is well-implemented, handling dependencies correctly.

apps/webapp/app/v3/marqs/types.ts (3)

6-18: Well-structured queue and environment types!

The QueueDescriptor and MarQSKeyProducerEnv types are well-designed, providing clear separation of concerns and all necessary fields for queue and environment management.


20-71: Well-designed interface updates for environment-centric queue management!

The MarQSKeyProducer interface changes effectively support the transition to environment-centric concurrency management. The method overloads provide good flexibility for queue key generation.


73-83: Good enhancement to fair queue distribution!

The EnvQueues type and updated MarQSFairDequeueStrategy interface effectively support environment-level fair queue distribution.

apps/webapp/test/utils/marqs.ts (2)

33-46: LGTM!

The setupQueue function correctly uses the updated MarQSKeyProducer interface.


48-52: Good shift to environment-centric concurrency options!

The SetupConcurrencyOptions type effectively supports the transition from organization to environment-level concurrency management.

apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts (1)

111-113:

❓ Verification inconclusive

Verify the removal of organization-level concurrency checks.

The removal of organization-level concurrency monitoring aligns with the shift to environment-centric management. However, please verify that this doesn't impact existing workloads.


🏁 Script executed:

#!/bin/bash
# Search for any remaining references to organization-level concurrency
rg -i "org.*concurrency" --type ts

Length of output: 4026


Confirm removal of org-level concurrency monitoring and validate test expectations

The updated implementation now relies exclusively on environment- and queue-level concurrency checks. However, note that legacy references to organization-level keys still exist (e.g., in the tests within apps/webapp/test/marqsKeyProducer.test.ts). Please verify that:

  • The removal of organization-level checks in concurrencyMonitor.server.ts is intentional and doesn’t negatively impact workload management.
  • The remaining org-prefixed key patterns in tests are used solely for naming consistency (or similar purposes) and are not indicative of active concurrency logic.
  • Any outdated comments in the code (mentioning “org, env, queue”) are updated to accurately reflect the current design.
apps/webapp/app/presenters/v3/BatchListPresenter.server.ts (1)

193-193: LGTM! Improved batch status handling.

The change to consider any non-pending state as finished is more robust and correctly handles all possible batch states, including the new "ABORTED" state.

references/v3-catalog/src/trigger/simple.ts (1)

144-229: LGTM! Removed development and testing tasks.

The removal of parentTask and childTask is appropriate as they were primarily used for development and testing purposes. This cleanup helps maintain a cleaner codebase.

apps/webapp/app/v3/marqs/v2.server.ts (1)

12-12: LGTM! Updated import path.

The import path update correctly reflects the new location of MarQSShortKeyProducer implementation.

apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)

3-21: LGTM! Well-organized constants and regex patterns.

The constants and regex patterns are well-structured and provide a clear foundation for key generation.


23-130: LGTM! Clean implementation of core key generation methods.

The implementation is robust with good handling of different environment types and clear method signatures.

apps/webapp/app/v3/services/createCheckpoint.server.ts (1)

177-178: LGTM! Enhanced message priority for task resumption.

The addition of MarQSPriorityLevel.resume improves the message handling by explicitly setting the priority level for resuming tasks after a duration.

packages/core/src/v3/schemas/common.ts (1)

175-175: LGTM! Enhanced error handling for recursive wait deadlocks.

The addition of "RECURSIVE_WAIT_DEADLOCK" to the error codes improves the system's ability to handle and report deadlock scenarios in recursive wait operations.

apps/webapp/app/presenters/v3/SpanPresenter.server.ts (3)

121-123: LGTM! Optimized query by directly fetching required fields.

The addition of output, outputType, and error fields to the query improves efficiency by reducing the need for additional data fetching.


168-174: LGTM! Simplified output handling logic.

The refactored output handling is more concise and maintainable, with clear conditional checks and type handling.


184-194: LGTM! Improved error handling with better type safety.

The error handling logic is now more robust with proper type checking and fallback to custom error format.

apps/webapp/app/v3/services/resumeBatchRun.server.ts (2)

255-273: LGTM! Enhanced message replacement with proper timestamps and priority.

The message replacement logic now correctly handles:

  • Queue timestamps with proper fallback to creation time
  • Priority levels for resumed tasks
  • Comprehensive task metadata

191-210:

❓ Verification inconclusive

Address TODO comment and verify priority queue implementation.

The TODO comment suggests that the priority queue implementation might need further attention. However, the current implementation with MarQSPriorityLevel.resume looks correct.

Run this script to verify other usages of the priority queue:


🏁 Script executed:

#!/bin/bash
# Search for other TODO comments related to priority queue
rg -A 5 "TODO.*priority queue"

# Search for other uses of MarQSPriorityLevel
ast-grep --pattern 'MarQSPriorityLevel.$_'

Length of output: 3683


Verify Priority Queue Implementation Consistency

The use of MarQSPriorityLevel.resume in the code (apps/webapp/app/v3/services/resumeBatchRun.server.ts, lines 191–210) is consistent with other parts of the codebase (for example, in resumeTaskDependency.server.ts). Our search confirms that the priority queue usage across the system is uniform. Although a TODO comment still exists (“TODO: use the new priority queue thingie”), this appears to be a noted future improvement rather than an indication of a defect in the current implementation.

  • Action: Either update the TODO comment with details on an implementation plan for the new priority queue or remove it if the current approach is intended to remain.
  • File(s) Impacted:
    • apps/webapp/app/v3/services/resumeBatchRun.server.ts
    • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (similar TODO present)
apps/webapp/app/env.server.ts (1)

360-360:

❓ Verification inconclusive

Verify the impact of removing organization limits.

The removal of MARQS_MAXIMUM_ORG_COUNT in favor of MARQS_MAXIMUM_ENV_COUNT might affect existing deployments.


🏁 Script executed:

#!/bin/bash
# Description: Check for usage of the removed MARQS_MAXIMUM_ORG_COUNT in the codebase

echo "Checking for references to MARQS_MAXIMUM_ORG_COUNT..."
rg "MARQS_MAXIMUM_ORG_COUNT"

echo "Checking for environment variables in deployment configurations..."
fd -t f "deployment|docker-compose|k8s|env" -x grep -l "MARQS_MAXIMUM_ORG_COUNT" {} \;

Length of output: 378


Action Required: Manually Verify External Impacts of Removing Organization Limits

The updated code at apps/webapp/app/env.server.ts replaces the organization limit with an environment limit. Automated searches did not reveal any remaining references to MARQS_MAXIMUM_ORG_COUNT in the codebase. However, given the lack of output and potential gaps in automated detection—especially within external deployment configurations—it’s critical that you manually verify that no external services or configuration files (e.g., in deployment, Docker Compose, Kubernetes setups) still depend on the removed variable.

  • File Affected: apps/webapp/app/env.server.ts (line 360)
  • Code Context:
      MARQS_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
  • Action Items:
    • Confirm that no deployment or environment configuration file references MARQS_MAXIMUM_ORG_COUNT.
    • Ensure that any external integrations or services previously using the organization limit are updated accordingly.
apps/webapp/app/v3/services/completeAttempt.server.ts (1)

25-25:

❓ Verification inconclusive

Verify the impact of retry priority changes.

The addition of MarQSPriorityLevel.retry might affect the order of message processing.


🏁 Script executed:

#!/bin/bash
# Description: Check for other uses of MarQSPriorityLevel in the codebase

echo "Checking for MarQSPriorityLevel usage..."
rg "MarQSPriorityLevel" -A 5

echo "Checking for message priority handling in MarQS implementation..."
ast-grep --pattern $'class MarQS {
  $$$
  replaceMessage($_, $_, $_, $_) {
    $$$
  }
  $$$
}'

Length of output: 6897


Action Required: Confirm Ordering Impact of the New Retry Priority

The addition of MarQSPriorityLevel.retry (with a value of 10) in completeAttempt.server.ts now introduces a lower numeric priority compared to MarQSPriorityLevel.resume (100). Please verify that this ordering—where retry messages are processed with a lower priority than resume messages—is intentional and does not introduce any side effects in the MarQS message processing logic. Notably:

  • Ensure the change in numeric value correctly reflects the intended processing order.
  • Review the overall message handling flow in MarQS to confirm that adding a "retry" level does not inadvertently alter expected behavior in other parts of the system.
packages/core/src/v3/errors.ts (3)

238-238: LGTM! Appropriate placement of the new error code.

The RECURSIVE_WAIT_DEADLOCK error code is correctly placed with other non-retryable errors, as recursive deadlocks cannot be resolved through retries.


516-523: LGTM! Well-structured error definition with clear messaging.

The error definition includes:

  • A clear and descriptive message explaining the deadlock situation
  • A helpful link to documentation for users to learn more about the issue

916-931: LGTM! Comprehensive error string formatting.

The function:

  • Handles all error types exhaustively
  • Provides consistent string formatting
  • Uses clear and concise implementation
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx (2)

20-20: LGTM! Import follows project conventions.

The AdminDebugRun component import is correctly placed and follows the project's import conventions.


794-794: LGTM! Proper integration of AdminDebugRun component.

The component is:

  • Correctly placed in the footer section
  • Properly configured with the required friendlyId prop
apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam/route.tsx (1)

207-207: LGTM! Cleaner button text.

The removal of ellipsis from the button text creates a cleaner and more modern UI appearance.

apps/webapp/app/v3/eventRepository.server.ts (3)

98-99: LGTM! Well-designed EventBuilder enhancements.

The new methods:

  • stop(): Provides control over event tracing
  • failWithError(error: TaskRunError): Enables proper error handling

921-923: LGTM! Robust event state tracking.

The new state flags:

  • isStopped: Tracks if event tracing has been stopped
  • failedWithError: Captures any errors during event processing

988-989: LGTM! Comprehensive error state handling.

The event creation:

  • Properly sets isPartial and isError flags based on error state
  • Updates status appropriately
  • Includes error details in events when applicable

Also applies to: 995-995, 1023-1033

internal-packages/database/prisma/migrations/20250211150836_add_aborted_batch_task_run_status/migration.sql (3)

1-3: New Enum Value Added: 'ABORTED'

The migration adds a new value 'ABORTED' to the existing BatchTaskRunStatus enum. This change appears aligned with expanding the batch run status possibilities for more granular error handling in the concurrency system.

Consider verifying that any code comparing enum values (or relying on their order) is updated accordingly.


4-6: Dropping the Existing Index on SecretStore

The script drops the index "SecretStore_key_idx". Please double-check that this index is not actively used in production queries during the migration and that the downtime (if any) is planned.


7-9: Recreating the SecretStore Index with text_pattern_ops

Recreating the index on "SecretStore" using the operator class text_pattern_ops is a good measure for optimizing text pattern lookups. Make sure that all environments (and PostgreSQL versions used) support this operator class consistently.

internal-packages/database/prisma/migrations/20250210164232_add_queue_timestamp_to_run/migration.sql (2)

1-3: Re-Dropping the SecretStore Index

Again, the migration drops "SecretStore_key_idx". This is consistent with the approach in the other migration. Ensure that both migrations run in the correct order so that the final state of the index is correct.


7-9: Recreating the SecretStore Index with text_pattern_ops (Again)

Similar to the previous migration file, the index is re-created using the text_pattern_ops operator. This consistency helps ensure optimized pattern searches.

docs/queue-concurrency.mdx (7)

6-7: Clarifying Task Execution and Queuing

The introduction now clearly explains that when you trigger a task, it is not executed immediately but instead enqueued for later execution. This explanation sets the stage for the detailed concurrency discussion.


15-18: Informative Note on Environment Concurrency Limits

The added <Note> block provides valuable context regarding maximum concurrency limits based on the subscription plan. The link for contacting support is a nice touch for customer guidance.


20-21: ‘Setting task concurrency’ Heading

The new section header “## Setting task concurrency” is clear. It sets up the subsequent code example well.


37-37: Explanatory Text for Shared Access

The sentence added at (or around) line 37 clarifies that sharing a queue is useful for controlling access to shared resources (e.g. a database or API). This explanation improves the documentation’s clarity.


39-39: ‘Sharing concurrency between tasks’ Section

The section header “## Sharing concurrency between tasks” properly introduces the concept. The accompanying code snippet (with the example from /trigger/queue.ts) clearly demonstrates how multiple tasks can share a concurrency limit.


66-67: Highlighting Shared Queue Behavior

The added note (line 66) explains that when tasks share the same queue, only one can run at a time given the current limit. This is an essential point for understanding task coordination.


68-69: ‘Concurrency and subtasks’ Section

The updated section underscores that subtasks do not automatically inherit their parent task’s queue settings. This distinction is very useful for developers designing task hierarchies.

internal-packages/database/prisma/schema.prisma (1)

2211-2214: Enum Update: Inclusion of ‘ABORTED’ in BatchTaskRunStatus

The BatchTaskRunStatus enum now includes an ABORTED value. This matches the migration change and helps reflect the new batch status logic.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (2)
apps/webapp/app/v3/marqs/index.server.ts (1)

486-515: 🛠️ Refactor suggestion

Add error handling for message acknowledgment failures.

The acknowledgeMessage method should handle potential Redis operation failures more gracefully.

Add error handling:

 public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
   return this.#trace(
     "acknowledgeMessage",
     async (span) => {
       const message = await this.readMessage(messageId);

       if (!message) {
         logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
           messageId,
           service: this.name,
           reason,
         });
         return;
       }

+      try {
         span.setAttributes({
           [SemanticAttributes.QUEUE]: message.queue,
           [SemanticAttributes.MESSAGE_ID]: message.messageId,
           [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
           [SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
           ["marqs.reason"]: reason,
         });

         await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);

         await this.#callAcknowledgeMessage({
           parentQueue: message.parentQueue,
           messageQueue: message.queue,
           messageId,
         });

         await this.options.subscriber?.messageAcked(message);
+      } catch (error) {
+        logger.error(`Failed to acknowledge message`, {
+          messageId,
+          error,
+          service: this.name,
+          reason,
+        });
+        throw error;
+      }
     },
🧰 Tools
🪛 Biome (1.9.4)

[error] 489-489: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

references/v3-catalog/src/trigger/concurrency.ts (1)

153-258: 🛠️ Refactor suggestion

Use SDK's wait utility and add documentation for the test scenario.

  1. Replace all instances of setTimeout with the SDK's wait.for() utility.
  2. Consider adding documentation to explain the test scenario and task hierarchy.
+/**
+ * Test scenario for queue priority in task hierarchies:
+ * - Parent task triggers child task
+ * - Child task can trigger grandchild task based on propagate flag
+ * - All tasks use the same queue with concurrency limit of 1
+ * - Demonstrates priority handling for resuming/retrying tasks
+ */
 export const testChildTaskPriorityController = task({

The implementation effectively tests queue priority for task hierarchies, which aligns with the PR objectives.

🧹 Nitpick comments (34)
apps/webapp/app/v3/services/resumeTaskDependency.server.ts (2)

3-3: Remove unused import.

The MarQS type is imported but never used in this file.

-import { MarQS, marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server";
+import { marqs, MarQSPriorityLevel } from "~/v3/marqs/index.server";

53-53: Remove outdated TODO comment.

The TODO comment about using "the new priority queue thingie" is outdated since the priority queue implementation is already in place with MarQSPriorityLevel.resume.

-      // TODO: use the new priority queue thingie
apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (1)

27-88: Consider adding JSDoc comments for the sorting logic.

The sorting implementation is robust, maintaining stability for equal priorities and properly handling queue groups. However, the complex logic would benefit from documentation explaining the sorting strategy.

Add JSDoc comments to explain the sorting strategy:

+/**
+ * Sorts queues within each environment based on priority.
+ * Maintains original order for queues with equal priority.
+ * @param envs Array of environment queues
+ * @returns Sorted array of environment queues
+ */
 #sortQueuesInEnvironmentsByPriority(envs: EnvQueues[]): EnvQueues[] {
   return envs.map((env) => {
     return this.#sortQueuesInEnvironmentByPriority(env);
   });
 }

+/**
+ * Sorts queues within an environment based on priority.
+ * Groups queues by base name and selects highest priority queue from each group.
+ * Maintains original order for queues with equal priority.
+ * @param env Environment queues to sort
+ * @returns Environment with sorted queues
+ */
 #sortQueuesInEnvironmentByPriority(env: EnvQueues): EnvQueues {
apps/webapp/app/v3/marqs/types.ts (1)

20-71: Consider grouping related methods in the interface.

The interface is comprehensive but could benefit from logical grouping of related methods using JSDoc comments.

Add method grouping comments:

 export interface MarQSKeyProducer {
+  // Queue-related key methods
   queueConcurrencyLimitKey(env: MarQSKeyProducerEnv, queue: string): string;
   
+  // Environment-related key methods
   envConcurrencyLimitKey(envId: string): string;
   envConcurrencyLimitKey(env: MarQSKeyProducerEnv): string;
   
+  // Message-related key methods
   messageKey(messageId: string): string;
   nackCounterKey(messageId: string): string;
apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)

220-260: Consider adding input validation for queueDescriptorFromQueue.

While the method handles missing components well, it should also validate the format of individual components.

Add validation for component formats:

 queueDescriptorFromQueue(queue: string): QueueDescriptor {
+  if (!queue || typeof queue !== 'string') {
+    throw new Error('Queue must be a non-empty string');
+  }
+
   const match = queue.match(QUEUE_REGEX);
   
   if (!match) {
     throw new Error(`Invalid queue: ${queue}, no queue name found`);
   }
   
   const [, queueName] = match;
+  
+  if (!queueName || queueName.length === 0) {
+    throw new Error(`Invalid queue: ${queue}, queue name cannot be empty`);
+  }

262-285: Consider adding JSDoc for private helper methods.

The private helper methods would benefit from documentation explaining their purpose and parameters.

Add JSDoc comments:

+/**
+ * Shortens an ID by taking its last 12 characters.
+ * @param id The full ID to shorten
+ * @returns The shortened ID
+ */
 private shortId(id: string) {
   return id.slice(-12);
 }

+/**
+ * Creates an environment key section.
+ * @param envId Environment ID
+ * @returns Formatted environment key section
+ */
 private envKeySection(envId: string) {
   return `${constants.ENV_PART}:${this.shortId(envId)}`;
 }
apps/webapp/test/marqsKeyProducer.test.ts (2)

131-162: Add more error case tests for queueDescriptorFromQueue.

While basic error cases are covered, consider adding tests for empty strings and invalid component formats.

Add additional test cases:

 describe("queueDescriptorFromQueue", () => {
+  it("should throw error for empty queue string", () => {
+    expect(() => producer.queueDescriptorFromQueue("")).toThrow("Queue must be a non-empty string");
+  });
+
+  it("should throw error for queue with empty name", () => {
+    expect(() => producer.queueDescriptorFromQueue("org:123:env:456:queue:")).toThrow(
+      "queue name cannot be empty"
+    );
+  });

176-222: Consider more descriptive test case names.

While the test cases are comprehensive, their names could be more descriptive of the specific scenarios being tested.

Improve test case names:

-it("should generate correct current concurrency key", () => {
+it("should generate current concurrency key with basic queue name", () => {

-it("should include concurrency key when provided", () => {
+it("should include concurrency key in current concurrency key when provided", () => {

-it("should remove the priority bit when provided", () => {
+it("should exclude priority from current concurrency key when priority is present", () => {
apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts (3)

155-194: Consider adding error handling for edge cases in #shuffleQueuesByEnv.

The method could encounter issues if:

  1. maxLimit calculation results in -Infinity (when envs array is empty)
  2. Weighted calculations with very large numbers could cause precision issues

Consider adding these safeguards:

 const maxLimit = Math.max(...envs.map((envId) => snapshot.envs[envId].concurrency.limit));
+// Handle empty envs case
+if (!Number.isFinite(maxLimit) || maxLimit <= 0) {
+  return [];
+}

 const weightedEnvs: WeightedEnv[] = envs.map((envId) => {
   const env = snapshot.envs[envId];
   let weight = 1;

   if (biases.concurrencyLimitBias > 0) {
-    const normalizedLimit = env.concurrency.limit / maxLimit;
+    // Prevent division by zero and ensure normalized values
+    const normalizedLimit = maxLimit > 0 ? env.concurrency.limit / maxLimit : 0;
     weight *= 1 + Math.pow(normalizedLimit * biases.concurrencyLimitBias, 2);
   }

487-490: Remove unnecessary continue statement.

The continue statement at the end of the catch block is unnecessary as it's the last statement in the loop.

 } catch (error) {
   // Log error but continue trying other queues
   logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { error });
-  continue;
 }

1799-1865: Consider extracting Redis configuration into a separate function.

The Redis configuration setup in getMarQSClient is quite lengthy and could be made more maintainable.

Consider refactoring like this:

function createRedisConfig() {
  if (!env.REDIS_HOST || !env.REDIS_PORT) {
    throw new Error(
      "Could not initialize Trigger.dev because process.env.REDIS_HOST and process.env.REDIS_PORT are required to be set."
    );
  }

  return {
    keyPrefix: KEY_PREFIX,
    port: env.REDIS_PORT,
    host: env.REDIS_HOST,
    username: env.REDIS_USERNAME,
    password: env.REDIS_PASSWORD,
    enableAutoPipelining: true,
    ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
  };
}

function getMarQSClient() {
  const redis = new Redis(createRedisConfig());
  const keysProducer = new MarQSShortKeyProducer(KEY_PREFIX);
  
  return new MarQS({
    // ... rest of the configuration
  });
}
apps/webapp/app/v3/marqs/index.server.ts (2)

1288-1670: Consider splitting Redis command definitions into a separate file.

The Redis command definitions section is quite large and could be better maintained in a dedicated file.

Consider creating a new file redisCommands.ts to house these definitions, improving maintainability and separation of concerns.


1107-1119: Consider reducing debug logging verbosity.

The debug logging in the Redis command calls is quite verbose and could impact performance in production.

Consider adding a verbosity level check:

+const shouldLogVerbose = process.env.NODE_ENV !== 'production' || this.options.verbose;

 logger.debug("Calling dequeueMessage", {
+  ...(shouldLogVerbose ? {
     messageQueue,
     parentQueue,
     queueConcurrencyLimitKey,
     envConcurrencyLimitKey,
     queueCurrentConcurrencyKey,
     queueReserveConcurrencyKey,
     envCurrentConcurrencyKey,
     envReserveConcurrencyKey,
     envQueueKey,
     service: this.name,
+  } : {
+    messageQueue,
+    service: this.name,
+  })
 });
references/v3-catalog/src/trigger/concurrency.ts (2)

19-56: Consider adding JSDoc documentation.

The task implementation looks good and aligns with testing the reserve concurrency system. Consider adding JSDoc documentation to explain the purpose of the parameters and the test scenario.

 export const testConcurrency = task({
+  /**
+   * Tests the reserve concurrency system by triggering multiple parent tasks
+   * that create child tasks, demonstrating queue priority and concurrency management.
+   * @param count - Number of tasks to trigger in batch
+   * @param delay - Delay for parent task execution
+   * @param childDelay - Delay for child task execution
+   */
   id: "test-concurrency-controller",

260-289: Document queue sharing and potential deadlock scenario.

The implementation demonstrates queue sharing between tasks, which is a key scenario mentioned in the PR objectives. However, this setup could potentially lead to a Recursive Queue Deadlock when tasks in the same queue wait for each other.

Consider adding documentation to:

  1. Explain the purpose of sharing the same queue
  2. Warn about potential deadlock scenarios
  3. Describe how the reserve concurrency system handles this
 export const myQueue = queue({
+  /**
+   * Queue shared between parent, subtask, and subsubtask to demonstrate:
+   * 1. Queue priority for resuming/retrying tasks
+   * 2. Reserve concurrency system handling recursive waits
+   * Note: Tasks sharing the same queue with triggerAndWait pattern
+   * could lead to deadlock without proper reserve concurrency handling.
+   */
   name: "my-queue",
   concurrencyLimit: 1,
 });
apps/webapp/app/v3/services/enqueueRun.server.ts (1)

21-66: Consider adding error handling for edge cases.

The function handles the basic recursive wait deadlock case, but there might be other edge cases to consider:

  1. What happens if marqs.enqueueMessage throws an error?
  2. What happens if the environment or run objects are invalid?

Consider adding try-catch blocks and input validation:

 export async function enqueueRun({
   env,
   run,
   dependentRun,
 }: EnqueueRunOptions): Promise<EnqueueRunResult> {
+  // Validate input
+  if (!env || !run) {
+    return {
+      ok: false,
+      error: {
+        type: "INTERNAL_ERROR",
+        code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY,
+        message: "Invalid environment or run object",
+      },
+    };
+  }
+
+  try {
     const wasEnqueued = await marqs.enqueueMessage(
       env,
       run.queue,
       run.id,
       {
         type: "EXECUTE",
         taskIdentifier: run.taskIdentifier,
         projectId: env.projectId,
         environmentId: env.id,
         environmentType: env.type,
       },
       run.concurrencyKey ?? undefined,
       run.queueTimestamp ?? undefined,
       dependentRun
         ? { messageId: dependentRun.id, recursiveQueue: dependentRun.queue === run.queue }
         : undefined
     );

     if (!wasEnqueued) {
       const error = {
         type: "INTERNAL_ERROR",
         code: TaskRunErrorCodes.RECURSIVE_WAIT_DEADLOCK,
         message: `This run will never execute because it was triggered recursively and the task has no remaining concurrency available`,
       } satisfies TaskRunError;

       return {
         ok: false,
         error,
       };
     }

     return {
       ok: true,
     };
+  } catch (error) {
+    return {
+      ok: false,
+      error: {
+        type: "INTERNAL_ERROR",
+        code: TaskRunErrorCodes.TASK_EXECUTION_FAILED,
+        message: `Failed to enqueue run: ${error instanceof Error ? error.message : String(error)}`,
+      },
+    };
+  }
 }
apps/webapp/app/v3/services/resumeBatchRun.server.ts (3)

191-210: Consider adding retry logic for message enqueuing.

The message enqueuing operation could fail due to transient issues. Consider implementing a retry mechanism with exponential backoff.

Consider wrapping the enqueue operation in a retry function:

async function retryEnqueueMessage(
  marqs: any,
  environment: any,
  queue: string,
  messageId: string,
  payload: any,
  concurrencyKey?: string,
  queueTimestamp?: Date,
  dependentRun?: any,
  priority?: MarQSPriorityLevel,
  maxRetries = 3
): Promise<boolean> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await marqs.enqueueMessage(
        environment,
        queue,
        messageId,
        payload,
        concurrencyKey,
        queueTimestamp,
        dependentRun,
        priority
      );
    } catch (error) {
      if (attempt === maxRetries) throw error;
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 100));
    }
  }
  return false;
}

255-273: Consider adding validation for timestamp conversion.

The timestamp conversion using getTime() could throw if the date is invalid.

Add validation before converting the timestamp:

-          (
-            dependentTaskAttempt.taskRun.queueTimestamp ?? dependentTaskAttempt.taskRun.createdAt
-          ).getTime(),
+          const timestamp = dependentTaskAttempt.taskRun.queueTimestamp ?? dependentTaskAttempt.taskRun.createdAt;
+          if (!(timestamp instanceof Date) || isNaN(timestamp.getTime())) {
+            logger.warn("Invalid timestamp for message replacement", {
+              timestamp,
+              dependentTaskAttemptId: dependentTaskAttempt.id
+            });
+            return new Date().getTime();
+          }
+          timestamp.getTime(),

191-210: Remove or update the informal TODO comment.

The TODO comment uses informal language ("thingie"). Either remove it since the priority queue implementation is already in place, or update it with a more professional description.

-        // TODO: use the new priority queue thingie

The enqueueMessage parameters are correctly updated with timestamp and priority level.

packages/core/src/v3/errors.ts (1)

916-931: Consider adding test coverage for error string conversion.

The new taskRunErrorToString function handles all error types but lacks test coverage.

Consider adding tests to verify the string conversion for each error type:

describe('taskRunErrorToString', () => {
  it('should format internal errors', () => {
    const error: TaskRunError = {
      type: "INTERNAL_ERROR",
      code: "RECURSIVE_WAIT_DEADLOCK",
      message: "Test message"
    };
    expect(taskRunErrorToString(error)).toBe("Internal error [RECURSIVE_WAIT_DEADLOCK]: Test message");
  });

  it('should format built-in errors', () => {
    const error: TaskRunError = {
      type: "BUILT_IN_ERROR",
      name: "TypeError",
      message: "Test error"
    };
    expect(taskRunErrorToString(error)).toBe("TypeError: Test error");
  });

  // Add more test cases for STRING_ERROR and CUSTOM_ERROR
});
apps/webapp/app/v3/services/triggerTask.server.ts (2)

569-595: Consider adding logging for enqueue failures.

The error handling for enqueue failures could benefit from detailed logging.

Add logging to track enqueue failures:

 if (!enqueueResult.ok) {
+  logger.error("Failed to enqueue run", {
+    runId: run.id,
+    friendlyId: run.friendlyId,
+    error: enqueueResult.error,
+    taskId: run.taskIdentifier,
+    queue: run.queue,
+    environment: {
+      id: environment.id,
+      type: environment.type
+    }
+  });
+
   await this._prisma.taskRun.update({
     where: { id: run.id },
     data: {

602-617: Consider adding transaction rollback on error.

When throwing a ServiceValidationError, consider rolling back any database changes.

Add transaction rollback:

 if (result?.error) {
+  await this._prisma.$transaction(async (tx) => {
+    // Rollback any changes made to the run
+    await tx.taskRun.delete({
+      where: { id: result.run.id }
+    });
+  });
+
   throw new ServiceValidationError(
     taskRunErrorToString(taskRunErrorEnhancer(result.error))
   );
 }
references/test-tasks/src/utils.ts (3)

6-32: Consider validating the returned run object and adding a progressive backoff strategy

  1. If runs.retrieve(id) returns null or some unexpected shape, the code might silently error when accessing run.status. You may want to confirm the run is non-null before checking its status.
  2. Instead of a fixed 1-second delay, a progressive or exponential backoff can reduce API load while waiting for long-running tasks.

34-70: Validate concurrency limit before making the request

Currently, updateEnvironmentConcurrencyLimit passes concurrencyLimit directly through. You could:

  1. Check if concurrencyLimit is ≥ 1 (or whichever minimum applies).
  2. Enforce an upper bound if relevant.
  3. Use a small Zod schema to keep the logic aligned with your other validated requests.

82-109: Ensure consistency with environment variables and handle errors gracefully

getEnvironmentStats relies on TRIGGER_API_URL and TRIGGER_ACCESS_TOKEN but does not explicitly validate their presence. Consider:

  1. Verifying these variables (similar to updateEnvironmentConcurrencyLimit).
  2. Providing a clearer error message if they are undefined.
references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (4)

9-54: Consider error-isolation or cleanup across multiple tests

describeReserveConcurrencySystem runs many sub-tests sequentially. If a sub-test fails partway, some concurrency changes or states may linger. You might:

  • Implement a higher-level try/finally to reset concurrency states.
  • Isolate each sub-test in separate contexts if that suits your testing approach.

367-411: Guard against infinite recursion where depth could accidentally be large

recursiveTask depends on depth to terminate. Consider adding a safe upper bound or a sanity check to prevent unbounded recursion if inputs are malformed.


464-494: Code duplication across resumeParentTask and genericParentTask

Both tasks share similar logic for triggering child tasks and using batch or single runs. Consider extracting common logic into a helper to adhere to DRY principles.


503-530: Similar logic to resumeParentTask could be factored out

genericParentTask duplicates much of the same code path. Centralizing shared logic might improve maintainability.

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (2)

68-70: Optional queue parameter schema is good but could be extended

You might:

  1. Validate that queue matches a known format (if you have naming conventions).
  2. Provide helpful error messages for invalid queue inputs.

72-145: Enhance loader error handling & null checks

  1. The function calls marqs.getEnvConcurrencyLimit and related methods; consider verifying marqs is defined, and handle potential connection or runtime errors.
  2. If concurrency lookups fail, providing more descriptive error info can help debug.
apps/webapp/app/components/admin/debugRun.tsx (1)

80-340: Consider refactoring repeated Property.Item components.

The code contains multiple Property.Item components with similar structure. Consider creating a utility function or map-based approach to generate these items dynamically:

+type DebugProperty = {
+  label: string;
+  value: string;
+  iconButton?: boolean;
+};
+
+function PropertyItem({ label, value, iconButton = true }: DebugProperty) {
+  return (
+    <Property.Item>
+      <Property.Label>{label}</Property.Label>
+      <Property.Value className="flex items-center gap-2">
+        <ClipboardField
+          value={value}
+          variant="tertiary/small"
+          iconButton={iconButton}
+        />
+      </Property.Value>
+    </Property.Item>
+  );
+}

-<Property.Item>
-  <Property.Label>Queue key</Property.Label>
-  <Property.Value className="flex items-center gap-2">
-    <ClipboardField
-      value={withPrefix(
-        keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
-      )}
-      variant="tertiary/small"
-      iconButton
-    />
-  </Property.Value>
-</Property.Item>
+<PropertyItem
+  label="Queue key"
+  value={withPrefix(
+    keys.queueKey(run.runtimeEnvironment, run.queue, run.concurrencyKey ?? undefined)
+  )}
+/>

This refactoring would:

  1. Reduce code duplication
  2. Make the code more maintainable
  3. Reduce the risk of typos
docs/queue-concurrency.mdx (2)

70-71: Add commas after introductory phrases.

For improved readability, add commas after introductory phrases:

-When you trigger a task you can override the concurrency limit.
+When you trigger a task, you can override the concurrency limit.

-For example purposes let's say the environment concurrency limit is 1.
+For example purposes, let's say the environment concurrency limit is 1.

Also applies to: 219-220

🧰 Tools
🪛 LanguageTool

[typographical] ~70-~70: Consider inserting a comma for improved readability.
Context: ...u trigger a run When you trigger a task you can override the concurrency limit. Thi...

(INITIAL_ADVP_COMMA)


257-258: Consider rephrasing for stronger emphasis.

Instead of using "very important", consider a stronger phrasing:

-It's very important to note that we only release at-most X slots
+It is crucial to note that we only release at-most X slots
🧰 Tools
🪛 LanguageTool

[style] ~257-~257: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...e parent task slot is reoccupied. It's very important to note that we only release at-most X ...

(EN_WEAK_ADJECTIVE)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 93b21a4 and 84f5563.

⛔ Files ignored due to path filters (2)
  • docs/images/recursive-task-deadlock-min.png is excluded by !**/*.png
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (47)
  • apps/webapp/app/components/admin/debugRun.tsx (1 hunks)
  • apps/webapp/app/components/runs/v3/BatchStatus.tsx (4 hunks)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/BatchListPresenter.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (4 hunks)
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam/route.tsx (3 hunks)
  • apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (2 hunks)
  • apps/webapp/app/routes/projects.v3.$projectRef.metrics/registerProjectMetrics.server.ts (0 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx (2 hunks)
  • apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts (1 hunks)
  • apps/webapp/app/v3/eventRepository.server.ts (5 hunks)
  • apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts (12 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (25 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (0 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts (1 hunks)
  • apps/webapp/app/v3/marqs/types.ts (2 hunks)
  • apps/webapp/app/v3/marqs/v2.server.ts (1 hunks)
  • apps/webapp/app/v3/services/batchTriggerV3.server.ts (1 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (3 hunks)
  • apps/webapp/app/v3/services/createCheckpoint.server.ts (3 hunks)
  • apps/webapp/app/v3/services/enqueueDelayedRun.server.ts (3 hunks)
  • apps/webapp/app/v3/services/enqueueRun.server.ts (1 hunks)
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts (5 hunks)
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (4 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (7 hunks)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts (1 hunks)
  • apps/webapp/test/fairDequeuingStrategy.test.ts (13 hunks)
  • apps/webapp/test/marqsKeyProducer.test.ts (1 hunks)
  • apps/webapp/test/utils/marqs.ts (3 hunks)
  • docs/queue-concurrency.mdx (6 hunks)
  • internal-packages/database/prisma/migrations/20250210164232_add_queue_timestamp_to_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250211150836_add_aborted_batch_task_run_status/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (2 hunks)
  • packages/core/src/v3/errors.ts (4 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/common.ts (1 hunks)
  • packages/core/src/v3/utils/flattenAttributes.ts (2 hunks)
  • references/test-tasks/package.json (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (1 hunks)
  • references/test-tasks/src/utils.ts (1 hunks)
  • references/test-tasks/trigger.config.ts (1 hunks)
  • references/test-tasks/tsconfig.json (1 hunks)
  • references/v3-catalog/src/trigger/concurrency.ts (3 hunks)
  • references/v3-catalog/src/trigger/simple.ts (1 hunks)
  • references/v3-catalog/trigger.config.ts (1 hunks)
💤 Files with no reviewable changes (2)
  • apps/webapp/app/routes/projects.v3.$projectRef.metrics/registerProjectMetrics.server.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
🚧 Files skipped from review as they are similar to previous changes (24)
  • internal-packages/database/prisma/migrations/20250211150836_add_aborted_batch_task_run_status/migration.sql
  • internal-packages/database/prisma/migrations/20250210164232_add_queue_timestamp_to_run/migration.sql
  • apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts
  • references/v3-catalog/trigger.config.ts
  • apps/webapp/app/v3/services/completeAttempt.server.ts
  • references/test-tasks/tsconfig.json
  • apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts
  • apps/webapp/app/v3/services/createCheckpoint.server.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx
  • references/v3-catalog/src/trigger/simple.ts
  • packages/core/src/v3/schemas/common.ts
  • apps/webapp/test/utils/marqs.ts
  • apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
  • apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
  • apps/webapp/app/env.server.ts
  • references/test-tasks/package.json
  • packages/core/src/v3/utils/flattenAttributes.ts
  • internal-packages/database/prisma/schema.prisma
  • apps/webapp/app/v3/marqs/v2.server.ts
  • packages/core/src/v3/links.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • apps/webapp/app/components/runs/v3/BatchStatus.tsx
  • references/test-tasks/trigger.config.ts
  • apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam/route.tsx
🧰 Additional context used
🪛 LanguageTool
docs/queue-concurrency.mdx

[typographical] ~70-~70: Consider inserting a comma for improved readability.
Context: ...u trigger a run When you trigger a task you can override the concurrency limit. Thi...

(INITIAL_ADVP_COMMA)


[typographical] ~219-~219: After the expression ‘for example’ a comma is usually used.
Context: ...yload) => { //... }, }); ``` For example purposes, let's say the environment con...

(COMMA_FOR_EXAMPLE)


[style] ~257-~257: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...e parent task slot is reoccupied. It's very important to note that we only release at-most X ...

(EN_WEAK_ADJECTIVE)

🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts

[error] 489-489: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (41)
apps/webapp/app/v3/services/resumeTaskDependency.server.ts (2)

69-71: LGTM! Well-implemented priority queue system.

The implementation correctly:

  • Uses the queueTimestamp with fallback to createdAt
  • Sets the appropriate priority level for resuming tasks
    This aligns with the PR objectives of prioritizing resuming runs.

104-105: LGTM! Consistent priority implementation.

The message replacement logic maintains consistency with the enqueueing implementation by:

  • Using the same timestamp fallback strategy
  • Setting the same priority level for resuming tasks
apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (3)

1-13: LGTM! Clean initialization using delegate pattern.

The class properly implements the interface and initializes the delegate through constructor injection.


15-25: LGTM! Clear and focused method implementation.

The method correctly delegates base functionality and applies priority sorting.


90-94: LGTM! Clear priority extraction with sensible default.

The method properly extracts priority from queue descriptor with a default of 0.

apps/webapp/app/v3/marqs/types.ts (4)

6-12: LGTM! Well-structured queue descriptor type.

The type properly captures all necessary queue attributes with appropriate optional fields.


14-18: LGTM! Focused environment type definition.

The type efficiently captures required environment information.


73-83: LGTM! Clear and focused strategy types.

The types effectively capture the requirements for environment-based queue distribution.


85-114: LGTM! Well-defined message types with zod schema.

The message types are properly defined with clear validation schema.

apps/webapp/app/v3/marqs/marqsKeyProducer.ts (1)

1-22: LGTM! Well-organized constants and regex patterns.

Constants and regex patterns are clearly defined and maintainable.

apps/webapp/test/envPriorityDequeueingStrategy.test.ts (1)

1-389: Well-structured and comprehensive test suite!

The test suite provides excellent coverage with:

  • Clear test cases for various queue prioritization scenarios
  • Good edge case handling (empty queues, negative priorities)
  • Proper verification of queue ordering and priority handling
  • Comprehensive testing of concurrency key interactions
apps/webapp/test/fairDequeuingStrategy.test.ts (1)

1-879: Excellent test coverage and implementation!

The test suite demonstrates:

  • Thorough testing of queue distribution logic
  • Good coverage of concurrency and environment handling
  • Clear test data setup and assertions
  • Proper edge case handling
references/v3-catalog/src/trigger/concurrency.ts (1)

1-17: LGTM!

The import statement has been updated to include the queue functionality, and the oneAtATime task implementation remains clean and correct.

apps/webapp/app/v3/services/enqueueRun.server.ts (1)

1-10: LGTM!

The imports and type definitions are well-structured and appropriate for the functionality.

apps/webapp/app/v3/services/resumeBatchRun.server.ts (5)

3-3: LGTM!

The import of MarQSPriorityLevel is appropriate for the new priority queue functionality.


155-156: LGTM!

The addition of createdAt and queueTimestamp fields is necessary for proper timestamp handling in the queue.


3-3: LGTM!

The MarQSPriorityLevel import is correctly added and used consistently throughout the file.


155-156: LGTM!

The addition of createdAt and queueTimestamp fields is necessary for the new priority queue functionality and is correctly implemented.


255-273: LGTM!

The replaceMessage implementation correctly handles:

  • Filtering of taskRunAttemptIds
  • Timestamp calculation with proper fallback
  • Priority level setting
packages/core/src/v3/errors.ts (3)

238-239: LGTM!

Correctly marks recursive wait deadlock as a non-retryable error.


516-523: LGTM!

The error definition is clear and includes helpful documentation link.


684-689: LGTM!

Properly handles recursive wait deadlock in exception event enhancement.

apps/webapp/app/v3/services/triggerTask.server.ts (1)

376-380: LGTM!

The queueTimestamp logic correctly prioritizes timestamps from dependent runs.

references/test-tasks/src/utils.ts (2)

4-4: Good approach defining RunStatus via Awaited<ReturnType<typeof runs.retrieve>>

This type inference technique conveniently stays in sync with any structural changes in runs.retrieve, reducing maintenance overhead.


72-80: Zod schema for environment stats looks good

The schema comprehensively covers concurrency fields. This ensures consistent API responses across the system.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (12)

56-123: Logic for retry priority test is comprehensive

The flow (failure run → hold run → queued run) effectively ensures that failed runs get priority before new ones. No issues noted here.


125-173: Resume priority test is well-structured

The approach of checking that the resumed run completes before the newly queued run is clear and verifies the concurrency mechanics well.


175-233: Solid test design for duration-based resume priority

By branching for DEVELOPMENT vs. other environments, you ensure minimal overhead locally while testing concurrency logic thoroughly in other stages.


235-317: Check concurrency limit restoration ()

If testEnvReserveConcurrency fails midway, the concurrency limit remains altered due to the final updateEnvironmentConcurrencyLimit(ctx.environment.id, 100) call not being reached. Consider wrapping concurrency modifications in a try/finally block to ensure the environment concurrency limit is always restored, preventing side effects in subsequent tests.


319-365: Queue reserve concurrency test effectively verifies correct resource allocation

The logic of running a shallow recursion (depth=1) vs. a deeper recursion (depth=2) shows how concurrency is reserved and eventually fails when the limit is exceeded. Looks good.


412-415: Single-queue definition is straightforward

A concurrency limit of 1 ensures mutual exclusion for tasks referencing this queue. No issues noted.


417-425: Simple delay task is clear and correct

It accurately simulates long-running work by sleeping with setTimeout. Implementation is fine.


427-442: Retry logic is well-handled

retryTask gracefully retries up to 10 times, setting a 5-second interval. This is suitable for test scenarios.


444-462: Flexible wait approach meets concurrency testing needs

The toggling between wait.for and manual setTimeout underlines different concurrency scenarios. Code reads nicely.


496-501: Child task is straightforward

No improvements needed; it simply delays as specified.


532-538: Batch result unwrapping looks good

This utility elegantly detects failed runs and reports the first encountered error. Straightforward and effective.


540-545: Simple child task with consistent pattern

Implementation is minimal, focusing on a single delay. No concerns here.

apps/webapp/app/components/admin/debugRun.tsx (1)

14-20: LGTM! Good security check implementation.

The component correctly checks for admin access and impersonation status before rendering, providing a secure way to restrict access to debug functionality.

apps/webapp/app/v3/services/batchTriggerV3.server.ts (1)

459-470: LGTM! Improved error handling.

The changes enhance error handling by:

  1. Directly updating batch status to "ABORTED"
  2. Setting completedAt timestamp
  3. Properly propagating the error after status update

This provides better visibility into failed batches and ensures accurate timing information.

apps/webapp/app/v3/eventRepository.server.ts (1)

98-99: LGTM! Enhanced error handling capabilities.

The changes add robust error handling to the event builder:

  1. New stop() method to halt event processing
  2. New failWithError() method for error reporting
  3. Proper event property updates based on error state
  4. Automatic exception event generation on failure

This provides better visibility into failed events and ensures accurate error reporting.

Also applies to: 921-923, 988-989, 995-995, 1023-1033

docs/queue-concurrency.mdx (1)

6-7: LGTM! Clear introduction to task queuing.

The introduction clearly explains that tasks are queued rather than executed immediately, and explains the default unbounded concurrency behavior.

@ericallam ericallam merged commit 7b1159e into main Feb 19, 2025
12 checks passed
@ericallam ericallam deleted the run-engine-v1-concurrency-fixes branch February 19, 2025 11:43
// to free up concurrency for the children to run
// In the case of a recursive queue, reserving concurrency can fail, which means there is a deadlock and we need to fail the run

// TODO: reserveConcurrency can fail because of a deadlock, we need to handle that case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants